///|
priv enum TaskStatus {
  Fail(Error)
  Running
  Done
}

///|
/// A `Task` represents a waitable task context that can manage waitables and child coroutines.
/// 
struct Task {
  id : Int
  children : Map[Int, (&Waitable, Coroutine)]
  task_defer : Array[() -> Unit raise]
  resources : @deque.Deque[Int]
  mut task : Coroutine?
  mut waiting : Int
  mut status : TaskStatus
}

///|
pub let task_map : Map[Int, Task] = {}

///|
pub fn Task::new() -> Task {
  let waitable_set = waitable_set_new()
  _async_debug("waitable-set-new(\{waitable_set})")
  context_set(waitable_set)
  {
    id: waitable_set,
    children: {},
    resources: @deque.Deque::new(),
    task_defer: [],
    status: Running,
    waiting: 0,
    task: None,
  }
}

///|
pub fn Task::from_raw(raw : Int) -> Task {
  guard raw != 0
  context_set(raw)
  _async_debug("context-set(\{raw})")
  {
    id: raw,
    children: {},
    resources: @deque.Deque::new(),
    task_defer: [],
    status: Running,
    waiting: 0,
    task: None,
  }
}

///|
/// Check if the task is failed and return the error
pub fn Task::is_fail(self : Self) -> Error? {
  match self.status {
    Fail(err) => Some(err)
    _ => None
  }
}

///|
/// Check if all waitables are done
pub fn Task::no_wait(self : Self) -> Bool {
  self.waiting == 0
}

///|
/// Check if the task is done or failed
pub fn Task::is_done(self : Self) -> Bool {
  match self.status {
    Done => true
    Fail(_) => true
    Running => false
  }
}

///|
pub fn Task::handle(self : Self) -> Int {
  self.id
}

///|
pub fn Task::blocking_wait(self : Self) -> (Int, Int, Int) {
  let result : FixedArray[Int] = FixedArray::make(2, 0)
  let result_ptr = int_array2ptr(result)
  let event0 = waitable_set_wait(self.id, result_ptr)
  _async_debug("waitable_set_wait(\{event0}, \{result[0]}, \{result[1]})")
  (event0, result[0], result[1])
}

///|
pub fn Task::blocking_poll(self : Self) -> (Int, Int, Int) {
  let result : FixedArray[Int] = FixedArray::make(2, 0)
  let result_ptr = int_array2ptr(result)
  let event0 = waitable_set_poll(self.id, result_ptr)
  _async_debug("waitable-set-poll(\{event0}, \{result[0]}, \{result[1]})")
  (event0, result[0], result[1])
}

///|
/// Add a waitable to the waitable set and increase the waiting count
pub fn[T : Waitable] Task::add_waitable(
  self : Self,
  waitable : T,
  coro : Coroutine,
) -> Unit {
  waitable_join(waitable.handle(), self.id)
  self.children[waitable.handle()] = (waitable, coro)
  self.resources.push_back(waitable.handle())
  _async_debug("waitable-set-join(\{waitable.handle()}, \{self.id})")
  self.waiting += 1
}

///|
/// When a waitable is done will be removed from the waitable set
/// then waitable will be try to drop
pub fn[T : Waitable] Task::remove_waitable(self : Self, state : T) -> Unit {
  _async_debug("waitable-set-join(\{state.handle()}, 0)")
  waitable_join(state.handle(), 0)
  self.waiting -= 1
}

///|
pub fn[T : Waitable] Task::drop_waitable(self : Self, state : T) -> Unit {
  let _ = state.drop()
  if self.resources.search(state.handle()) is Some(idx) {
    let _ = self.resources.remove(idx)

  }
  self.children.remove(state.handle())
}

///|
/// Cancel a waitable, remove it from the waitable set and force drop it
pub fn[T : Waitable] Task::cancel_waitable(self : Self, state : T) -> Unit {
  waitable_join(state.handle(), 0)
  _async_debug("waitable-set-join(\{state.handle()}, 0)")
  self.waiting -= 1
  state.cancel()
  let _ = state.drop()
  self.children.remove(state.handle())
}

///|
/// set current task context to 0 and let runner drop the waitable set
pub fn Task::drop(self : Self) -> Unit {
  context_set(0)
  defer waitable_set_drop(self.id)
  _async_debug("context-set(0)")
}

///|
/// Spawns a coroutine to execute an async function and without waits for its completion
/// while managing the waitable state.
pub fn Task::spawn(_self : Self, f : async () -> Unit) -> Unit {
  let _ = spawn(f)
  // start the coroutine
  rschedule()
}

///|
/// This function spawns a coroutine to run the async function and waits for its completion
pub async fn Task::wait(_ : Self, f : async () -> Unit) -> Unit {
  let coro = spawn(f)
  // start the coroutine
  rschedule()
  Coroutine::wait(coro)
}

///|
pub fn Task::add_defer(self : Self, f : () -> Unit raise) -> Unit {
  self.task_defer.push(f)
}

///|
pub fn callback(event : Int, waitable_id : Int, code : Int) -> Int {
  let event = Event::decode(event)
  _async_debug("callback(\{event}, \{waitable_id}, \{code})")
  let task = match current_waitable_set() {
    Some(task) => task
    None => current_task()
  }
  // Handle the event for the current waitable task  
  match event {
    FutureRead | FutureWrite | StreamRead | StreamWrite | Subtask => {
      let (state, coro) = task.children[waitable_id]
      state.update(code~)
      // schedule next coroutine
      coro.wake()
      rschedule()
      if task.no_wait() && task.task is Some(parent) {
        // run the parent coroutine when all waitables are done
        // parent coroutine may execute return/cancel
        parent.wake()
        rschedule()
        return CallbackCode::Exit.encode()
      }
      return CallbackCode::Wait(task.id).encode()
    }
    TaskCancel => {
      if task.task is Some(parent) {
        parent.wake()
      }
      task.children
      .values()
      .each(child => {
        let (state, coro) = child
        task.cancel_waitable(state)
        coro.cancel()
      })
      rschedule()
      return CallbackCode::Exit.encode()
    }
    None => {
      rschedule()
      return CallbackCode::Exit.encode()
    }
  }
}

///|
pub fn Task::with_waitable_set(
  self : Self,
  f : async (Self) -> Unit,
  is_drop? : Bool = false,
) -> Coroutine noraise {
  let parent = spawn(async fn() -> Unit noraise {
    self.status = Running
    defer {
      while self.resources.pop_front() is Some(handle) {
        let state = self.children.get(handle)
        if state is Some((state, _)) {
          let _ = state.drop()
          self.children.remove(handle)
        }
      }
      if self.status is Running {
        self.status = Done
      }
      task_map.remove(self.id)

      // this defer block recycles waitable task resources
      while self.task_defer.pop() is Some(defer_block) {
        defer_block() catch {
          err => if self.status is Done { self.status = Fail(err) }
        }
      }

      // runner will drop the waitable set
      // export async function needs to keep the waitable set
      if is_drop {
        self.drop()
      }
    }
    f(self) catch {
      err => if self.status is Running { self.status = Fail(err) }
    }
    if !self.no_wait() {
      _async_debug("task-wait-loop(\{self.id})")
      suspend() catch {
        err => if self.status is Running { self.status = Fail(err) }
      }
    }
  })
  self.task = Some(parent)
  // start the parent coroutine 
  parent.run()
  rschedule()
  parent
}

///|
fn current_waitable_set() -> Task? {
  let ctx = context_get()
  _async_debug("context-get(\{ctx})")
  if ctx == 0 {
    None
  } else {
    match task_map.get(ctx) {
      Some(task) => Some(task)
      None => {
        let ctx = Task::from_raw(ctx)
        task_map[ctx.id] = ctx
        Some(ctx)
      }
    }
  }
}

///|
pub fn current_task() -> Task {
  let ctx = context_get()
  if ctx == 0 {
    let ctx = Task::new()
    task_map[ctx.id] = ctx
    ctx
  } else {
    match task_map.get(ctx) {
      Some(task) => task
      None => {
        let ctx = Task::from_raw(ctx)
        task_map[ctx.id] = ctx
        ctx
      }
    }
  }
}